/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/***********************************************************************
 *   $Id: dmlpackageprocessor.cpp 9673 2013-07-09 15:59:49Z chao $
 *
 *
 ***********************************************************************/
#include "dmlpackageprocessor.h"

#include <math.h>
using namespace std;

#include <boost/algorithm/string/case_conv.hpp>
using namespace boost::algorithm;
#include <boost/tokenizer.hpp>
#include <boost/shared_ptr.hpp>

#include "we_messages.h"
using namespace WriteEngine;
using namespace dmlpackage;
#include "calpontselectexecutionplan.h"
#include "simplecolumn.h"
#include "constantcolumn.h"
#include "simplefilter.h"
#include "constantfilter.h"
#include "columnresult.h"
using namespace execplan;
using namespace logging;
#include "configcpp.h"
using namespace config;
#include "joblistfactory.h"
#include "joblist.h"
#include "distributedenginecomm.h"
using namespace joblist;
#include "bytestream.h"
#include "messagequeue.h"
using namespace messageqcpp;
#include "tablelockdata.h"
#include "exceptclasses.h"

namespace
{
using namespace execplan;

const SOP opeq(new Operator("="));
const SOP opne(new Operator("<>"));
const SOP opor(new Operator("or"));
const SOP opand(new Operator("and"));
}  // namespace

namespace dmlpackageprocessor
{
DMLPackageProcessor::~DMLPackageProcessor()
{
  // cout << "In DMLPackageProcessor destructor " << this << endl;
  if (fWEClient)
    delete fWEClient;

  if (fExeMgr)
    delete fExeMgr;
}

//@bug 397
void DMLPackageProcessor::cleanString(string& s)
{
  string::size_type pos = s.find_first_not_of(" ");

  // stripe off space and ' or '' at beginning and end
  if (pos < s.length())
  {
    s = s.substr(pos, s.length() - pos);

    if ((pos = s.find_last_of(" ")) < s.length())
    {
      s = s.substr(0, pos);
    }
  }

  if (s[0] == '\'')
  {
    s = s.substr(1, s.length() - 2);

    if (s[0] == '\'')
      s = s.substr(1, s.length() - 2);
  }
}
#if 0
boost::any DMLPackageProcessor::tokenizeData( execplan::CalpontSystemCatalog::SCN txnID,
        execplan::CalpontSystemCatalog::ColType colType,
        const std::string& data, DMLResult& result, bool isNULL )
{
    SUMMARY_INFO("DMLPackageProcessor::tokenizeData");

    bool retval = true;
    boost::any value;

    if (isNULL)
    {
        WriteEngine::Token nullToken;
        value = nullToken;
    }
    else
    {
        if ( data.length() > (unsigned int)colType.colWidth )
        {
            retval = false;
            // build the logging message
            logging::Message::Args args;
            logging::Message message(6);
            args.add("Insert value is too large for colum ");
            message.format( args );

            result.result = INSERT_ERROR;
            result.message = message;
        }
        else
        {
            //Tokenize the data value
            WriteEngine::DctnryStruct dictStruct;
            dictStruct.dctnryOid = colType.ddn.dictOID;
            //cout << "Dictionary OIDs: " << colType.ddn.treeOID << " " << colType.ddn.listOID << endl;
            WriteEngine::DctnryTuple  dictTuple;
            dictTuple.sigValue = data.c_str();
            dictTuple.sigSize = data.length();
            int error = NO_ERROR;

            if ( NO_ERROR != (error = fWriteEngine.tokenize( txnID, dictStruct, dictTuple)) )
            {
                retval = false;
                //cout << "Error code from WE: " << error << endl;
                // build the logging message
                logging::Message::Args args;
                logging::Message message(1);
                args.add("Tokenization failed on: ");
                args.add(data);
                args.add("error number: ");
                args.add( error );
                message.format( args );

                result.result = TOKEN_ERROR;
                result.message = message;
            }

            WriteEngine::Token aToken = dictTuple.token;
            value = aToken;
        }
    }

    return value;
}
#endif
void DMLPackageProcessor::getColumnsForTable(uint32_t sessionID, std::string schema, std::string table,
                                             dmlpackage::ColumnList& colList)
{
  CalpontSystemCatalog::TableName tableName;
  tableName.schema = schema;
  tableName.table = table;

  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  CalpontSystemCatalog::RIDList ridList = systemCatalogPtr->columnRIDs(tableName, true);

  CalpontSystemCatalog::RIDList::const_iterator rid_iterator = ridList.begin();

  while (rid_iterator != ridList.end())
  {
    CalpontSystemCatalog::ROPair roPair = *rid_iterator;
    DMLColumn* columnPtr = new DMLColumn();
    CalpontSystemCatalog::TableColName tblColName = systemCatalogPtr->colName(roPair.objnum);
    columnPtr->set_Name(tblColName.column);

    colList.push_back(columnPtr);

    ++rid_iterator;
  }
}

char* DMLPackageProcessor::strlower(char* in)
{
  char* p = in;

  if (p)
  {
    while (*p)
    {
      *p = tolower(*p);
      p++;
    }
  }

  return in;
}

void DMLPackageProcessor::convertRidToColumn(uint64_t& rid, unsigned& dbRoot, unsigned& partition,
                                             unsigned& segment, unsigned filesPerColumnPartition,
                                             unsigned extentsPerSegmentFile, unsigned extentRows,
                                             unsigned startDBRoot, unsigned dbrootCnt,
                                             const unsigned startPartitionNum)
{
  partition = rid / (filesPerColumnPartition * extentsPerSegmentFile * extentRows);

  segment = (((rid % (filesPerColumnPartition * extentsPerSegmentFile * extentRows)) / extentRows)) %
            filesPerColumnPartition;

  dbRoot = ((startDBRoot - 1 + segment) % dbrootCnt) + 1;

  // Calculate the relative rid for this segment file
  uint64_t relRidInPartition = rid - ((uint64_t)partition * (uint64_t)filesPerColumnPartition *
                                      (uint64_t)extentsPerSegmentFile * (uint64_t)extentRows);
  idbassert(relRidInPartition <=
            (uint64_t)filesPerColumnPartition * (uint64_t)extentsPerSegmentFile * (uint64_t)extentRows);
  uint32_t numExtentsInThisPart = relRidInPartition / extentRows;
  unsigned numExtentsInThisSegPart = numExtentsInThisPart / filesPerColumnPartition;
  uint64_t relRidInThisExtent = relRidInPartition - numExtentsInThisPart * extentRows;
  rid = relRidInThisExtent + numExtentsInThisSegPart * extentRows;
}

string DMLPackageProcessor::projectTableErrCodeToMsg(uint32_t ec)
{
  if (ec < 1000)  // pre IDB error code
  {
    ErrorCodes ecObj;
    string errMsg("Statement failed.");
    errMsg += ecObj.errorString(ec).substr(150);  // substr removes ErrorCodes::fPreamble
    return errMsg;
  }

  // IDB error
  return IDBErrorInfo::instance()->errorMsg(ec);
}

bool DMLPackageProcessor::validateVarbinaryVal(std::string& inStr)
{
  bool invalid = false;

  for (unsigned i = 0; i < inStr.length(); i++)
  {
    if (!isxdigit(inStr[i]))
    {
      invalid = true;
      break;
    }
  }

  return invalid;
}

int DMLPackageProcessor::commitTransaction(uint64_t uniqueId, BRM::TxnID txnID)
{
  int rc = fDbrm->vbCommit(txnID.id);
  return rc;
}

// Tries to rollback transaction, if network error tries one more time
// MCOL-5263.
int32_t DMLPackageProcessor::tryToRollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
                                                      string& errorMsg)
{
  auto weRc = rollBackTransaction(uniqueId, txnID, sessionID, errorMsg);
  if (weRc)
  {
    weRc = rollBackTransaction(uniqueId, txnID, sessionID, errorMsg);
    if (weRc == 0)
    {
      // Setup connection in WE with PS.
      joblist::ResourceManager* rm = joblist::ResourceManager::instance(true);
      joblist::DistributedEngineComm* fEc = joblist::DistributedEngineComm::instance(rm);
      weRc = fEc->Setup();
    }
  }

  return weRc;
}

DMLPackageProcessor::DMLResult DMLPackageProcessor::processPackage(dmlpackage::CalpontDMLPackage& cpackage)
{
  auto result = processPackageInternal(cpackage);
  uint32_t tries = 0;
  // Try to setup connection and process package one more time.
  while ((result.result == PP_LOST_CONNECTION) && (tries < 5))
  {
    std::cerr << "DMLPackageProcessor: NETWORK ERROR; attempt # " << tries << std::endl;
    joblist::ResourceManager* rm = joblist::ResourceManager::instance(true);
    joblist::DistributedEngineComm* fEc = joblist::DistributedEngineComm::instance(rm);
    if (fEc->Setup())
      return result;

    result = processPackageInternal(cpackage);
    ++tries;
  }
  return result;
}

bool DMLPackageProcessor::checkPPLostConnection(std::exception& ex)
{
  std::string error = ex.what();
  return error.find(PPLostConnectionErrorCode) != std::string::npos;
}

int DMLPackageProcessor::rollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
                                             std::string& errorMsg)
{
  std::vector<BRM::LBID_t> lbidList;
  std::vector<BRM::LBIDRange> lbidRangeList;
  BRM::LBIDRange range;
  int rc = 0;
  // Check BRM status before processing.
  rc = fDbrm->isReadWrite();

  if (rc != 0)
  {
    std::string brmMsg;
    errorMsg = "Can't read DBRM isReadWrite [ ";
    BRM::errString(rc, brmMsg);
    errorMsg += brmMsg;
    errorMsg += "]";
    return rc;
  }

  ByteStream bytestream;
  fWEClient->addQueue(uniqueId);
  // cout << "adding to queue with id " << uniqueId << endl;
  bytestream << (ByteStream::byte)WE_SVR_ROLLBACK_BLOCKS;
  bytestream << uniqueId;
  bytestream << sessionID;
  bytestream << (uint32_t)txnID.id;
  uint32_t msgRecived = 0;

  try
  {
    fWEClient->write_to_all(bytestream);
    boost::shared_ptr<messageqcpp::ByteStream> bsIn;
    bsIn.reset(new ByteStream());
    ByteStream::byte tmp8;

    while (1)
    {
      if (msgRecived == fWEClient->getPmCount())
        break;

      fWEClient->read(uniqueId, bsIn);

      if (bsIn->length() == 0)  // read error
      {
        rc = NETWORK_ERROR;
        errorMsg = "Network error reading WEClient";
        fWEClient->removeQueue(uniqueId);
        // cout << "erroring out remove queue id " << uniqueId << endl;
        break;
      }
      else
      {
        *bsIn >> tmp8;
        rc = tmp8;

        if (rc != 0)
        {
          char szrc[20];
          *bsIn >> errorMsg;
          errorMsg += " (WriteEngine returns error ";
          sprintf(szrc, "%d", rc);
          errorMsg += szrc;
          errorMsg += ")";
          fWEClient->removeQueue(uniqueId);
          cout << "erroring out remove queue id " << uniqueId << endl;
          break;
        }
        else
          msgRecived++;
      }
    }
  }
  catch (std::exception& e)
  {
    rc = NETWORK_ERROR;
    errorMsg = "Network error occurred when rolling back blocks";
    errorMsg += e.what();
    fWEClient->removeQueue(uniqueId);
    cout << "erroring out remove queue id " << uniqueId << endl;
    // delete fWEClient;
    return rc;
  }
  catch (...)
  {
    rc = NETWORK_ERROR;
    errorMsg = "Unknown exception caught while rolling back transaction.";
    fWEClient->removeQueue(uniqueId);
    cout << "erroring out remove queue id " << uniqueId << endl;
    // delete fWEClient;
    return rc;
  }

  if (rc != 0)
  {
    // delete fWEClient;
    return rc;
  }

  fWEClient->removeQueue(uniqueId);
  // delete fWEClient;
  //	cout << "success. remove queue id " << uniqueId << endl;
  rc = fDbrm->getUncommittedLBIDs(txnID.id, lbidList);

  if (rc != 0)
  {
    std::string brmMsg;
    errorMsg = "DBRM getUncommittedLBIDs [ ";
    BRM::errString(rc, brmMsg);
    errorMsg += brmMsg;
    errorMsg += "]";
    return rc;
  }

  for (size_t i = 0; i < lbidList.size(); i++)
  {
    range.start = lbidList[i];
    range.size = 1;
    lbidRangeList.push_back(range);
  }

  rc = fDbrm->vbRollback(txnID.id, lbidRangeList);

  if (rc != 0)
  {
    std::string brmMsg;
    errorMsg = "DBRM vbRollback [ ";
    BRM::errString(rc, brmMsg);
    errorMsg += brmMsg;
    errorMsg += "]";
    return rc;
  }

  return rc;
}

int DMLPackageProcessor::commitBatchAutoOnTransaction(uint64_t uniqueId, BRM::TxnID txnID,
                                                      const uint32_t tableOid, std::string& errorMsg)
{
  // collect hwm info from all pms and set them here. remove table metadata if all successful
  ByteStream bytestream;
  fWEClient->addQueue(uniqueId);
  bytestream << (ByteStream::byte)WE_SVR_COMMIT_BATCH_AUTO_ON;
  bytestream << uniqueId;
  bytestream << (uint32_t)txnID.id;
  bytestream << tableOid;
  bytestream << fSessionID;

  uint32_t msgRecived = 0;
  fWEClient->write_to_all(bytestream);
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
  bsIn.reset(new ByteStream());
  int rc = 0;
  ByteStream::byte tmp8;
  typedef std::vector<BRM::BulkSetHWMArg> BulkSetHWMArgs;
  std::vector<BulkSetHWMArgs> hwmArgsAllPms;

  while (1)
  {
    if (msgRecived == fWEClient->getPmCount())
      break;

    fWEClient->read(uniqueId, bsIn);

    if (bsIn->length() == 0)  // read error
    {
      rc = NETWORK_ERROR;
      fWEClient->removeQueue(uniqueId);
      break;
    }
    else
    {
      *bsIn >> tmp8;
      rc = tmp8;

      if (rc != 0)
      {
        *bsIn >> errorMsg;
        fWEClient->removeQueue(uniqueId);
        break;
      }
      else
      {
        // get hwm info
        *bsIn >> errorMsg;
        BulkSetHWMArgs setHWMArgs;
        // cout << "received from WES bytestream length = " <<  bsIn->length() << endl;
        deserializeInlineVector(*(bsIn.get()), setHWMArgs);
        // cout << "get hwm info from WES size " << setHWMArgs.size() << endl;
        hwmArgsAllPms.push_back(setHWMArgs);
        msgRecived++;
      }
    }
  }

  if (rc != 0)
    return rc;

  // set hwm
  std::vector<BRM::BulkSetHWMArg> allHwm;
  BulkSetHWMArgs::const_iterator itor;

  // cout << "total hwmArgsAllPms size " << hwmArgsAllPms.size() << endl;
  for (unsigned i = 0; i < fWEClient->getPmCount(); i++)
  {
    itor = hwmArgsAllPms[i].begin();

    while (itor != hwmArgsAllPms[i].end())
    {
      allHwm.push_back(*itor);
      // cout << "received hwm info: " <<  itor->oid << ":" << itor->hwm << endl;
      itor++;
    }
  }

  // set CP data before hwm.

  // cout << "setting hwm allHwm size " << allHwm.size() << endl;
  BRM::CPInfoList_t cpInfos;

  std::vector<BRM::CPInfoMerge> mergeCPDataArgs;
  rc = fDbrm->bulkSetHWMAndCP(allHwm, cpInfos, mergeCPDataArgs, txnID.id);
  fDbrm->takeSnapshot();
  // Set tablelock to rollforward remove meta files

  if (rc != 0)
    return rc;

  bool stateChanged = true;
  TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID);
  uint64_t tablelockId = tablelockData->getTablelockId(tableOid);

  try
  {
    stateChanged = fDbrm->changeState(tablelockId, BRM::CLEANUP);
  }
  catch (std::exception&)
  {
    errorMsg = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
    stateChanged = false;
  }

  if (!stateChanged)
    return rc;

  bytestream.restart();
  //@Bug 4517 Remove meta data failure doesn't stop tablelock releasing.
  bytestream << (ByteStream::byte)WE_SVR_BATCH_AUTOON_REMOVE_META;
  bytestream << uniqueId;
  bytestream << tableOid;
  msgRecived = 0;
  fWEClient->write_to_all(bytestream);

  while (1)
  {
    if (msgRecived == fWEClient->getPmCount())
      break;

    fWEClient->read(uniqueId, bsIn);

    if (bsIn->length() == 0)  // read error
    {
      fWEClient->removeQueue(uniqueId);
      break;
    }
    else
    {
      *bsIn >> tmp8;
      msgRecived++;
    }
  }

  return rc;
}

int DMLPackageProcessor::rollBackBatchAutoOnTransaction(uint64_t uniqueId, BRM::TxnID txnID,
                                                        uint32_t sessionID, const uint32_t tableOid,
                                                        std::string& errorMsg)
{
  // Bulkrollback, rollback blocks, vbrollback, change state, remove meta file
  // cout << "In rollBackBatchAutoOnTransaction" << endl;
  std::vector<BRM::TableLockInfo> tableLocks;
  tableLocks = fDbrm->getAllTableLocks();
  // cout << " Got all tablelocks" << endl;
  unsigned idx = 0;
  string ownerName("DMLProc batchinsert");
  uint64_t tableLockId = 0;
  int rc = 0;

  for (; idx < tableLocks.size(); idx++)
  {
    if ((tableLocks[idx].ownerName == ownerName) && (tableLocks[idx].tableOID == tableOid))
    {
      tableLockId = tableLocks[idx].id;
      break;
    }
  }

  if ((tableLockId == 0) || (tableOid == 0))
  {
    // table is not locked by DMLProc. Could happen if we failed to get lock
    // while inserting. Not an error during rollback, but we don't
    // want to do anything.
    return rc;
  }

  // cout << "sending to WES" << endl;
  ByteStream bytestream;
  fWEClient->addQueue(uniqueId);
  // cout << "adding queue id " << uniqueId << endl;
  bytestream << (ByteStream::byte)WE_SVR_ROLLBACK_BATCH_AUTO_ON;
  bytestream << uniqueId;
  bytestream << sessionID;
  bytestream << tableLockId;
  bytestream << tableOid;
  uint32_t msgRecived = 0;
  fWEClient->write_to_all(bytestream);
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
  bsIn.reset(new ByteStream());
  ByteStream::byte tmp8;

  // cout << "waiting for reply from WES" << endl;
  while (1)
  {
    if (msgRecived == fWEClient->getPmCount())
      break;

    fWEClient->read(uniqueId, bsIn);

    if (bsIn->length() == 0)  // read error
    {
      rc = NETWORK_ERROR;
      fWEClient->removeQueue(uniqueId);
      // cout << "erroring out remove queue id " << uniqueId << endl;
      break;
    }
    else
    {
      *bsIn >> tmp8;
      rc = tmp8;

      if (rc != 0)
      {
        *bsIn >> errorMsg;
        fWEClient->removeQueue(uniqueId);
        // cout << "erroring out remove queue id " << uniqueId << endl;
        break;
      }
      else
        msgRecived++;
    }
  }

  if (rc == 0)  // change table lock state
  {
    bool stateChanged = true;

    // cout << "changing tablelock state" << endl;
    try
    {
      stateChanged = fDbrm->changeState(tableLockId, BRM::CLEANUP);
    }
    catch (std::exception&)
    {
      errorMsg = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
      stateChanged = false;
    }

    if (!stateChanged)
    {
      rc = 1;
    }
  }

  if (rc != 0)
    return rc;

  bytestream.restart();
  bytestream << (ByteStream::byte)WE_SVR_BATCH_AUTOON_REMOVE_META;
  bytestream << uniqueId;
  bytestream << tableOid;
  msgRecived = 0;
  fWEClient->write_to_all(bytestream);

  while (1)
  {
    if (msgRecived == fWEClient->getPmCount())
      break;

    fWEClient->read(uniqueId, bsIn);

    if (bsIn->length() == 0)  // read error
    {
      fWEClient->removeQueue(uniqueId);
      // cout << "erroring out remove queue id " << uniqueId << endl;
      break;
    }
    else
    {
      *bsIn >> tmp8;
      msgRecived++;
    }
  }

  fWEClient->removeQueue(uniqueId);
  return rc;
}

int DMLPackageProcessor::commitBatchAutoOffTransaction(uint64_t uniqueId, BRM::TxnID txnID,
                                                       const uint32_t tableOid, std::string& errorMsg)
{
  std::vector<BRM::TableLockInfo> tableLocks;
  tableLocks = fDbrm->getAllTableLocks();
  // cout << " Got all tablelocks" << endl;
  unsigned idx = 0;
  string ownerName("DMLProc batchinsert");
  uint64_t tableLockId = 0;
  int rc = 0;
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
  bsIn.reset(new ByteStream());
  ByteStream::byte tmp8;

  for (; idx < tableLocks.size(); idx++)
  {
    if ((tableLocks[idx].ownerName == ownerName) && (tableLocks[idx].tableOID == tableOid))
    {
      tableLockId = tableLocks[idx].id;
      break;
    }
  }

  if ((tableLockId == 0) || (tableOid == 0))
  {
    // table is not locked by DMLProc. Could happen if we failed to get lock
    // while inserting. Not an error during rollback, but we don't
    // want to do anything.
    return rc;
  }

  bool stateChanged = true;

  // cout << "changing tablelock state" << endl;
  try
  {
    stateChanged = fDbrm->changeState(tableLockId, BRM::CLEANUP);
  }
  catch (std::exception&)
  {
    errorMsg = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
    stateChanged = false;
  }

  if (!stateChanged)
  {
    rc = 1;
  }

  if (rc != 0)
    return rc;

  ByteStream bytestream;
  fWEClient->addQueue(uniqueId);
  bytestream << (ByteStream::byte)WE_SVR_BATCH_AUTOON_REMOVE_META;
  bytestream << uniqueId;
  bytestream << tableOid;
  uint32_t msgRecived = 0;
  fWEClient->write_to_all(bytestream);

  while (1)
  {
    if (msgRecived == fWEClient->getPmCount())
      break;

    fWEClient->read(uniqueId, bsIn);

    if (bsIn->length() == 0)  // read error
    {
      fWEClient->removeQueue(uniqueId);
      break;
    }
    else
    {
      *bsIn >> tmp8;
      msgRecived++;
    }
  }

  fWEClient->removeQueue(uniqueId);
  return rc;
}

int DMLPackageProcessor::rollBackBatchAutoOffTransaction(uint64_t uniqueId, BRM::TxnID txnID,
                                                         uint32_t sessionID, const uint32_t tableOid,
                                                         std::string& errorMsg)
{
  ByteStream bytestream;
  fWEClient->addQueue(uniqueId);
  bytestream << (ByteStream::byte)WE_SVR_ROLLBACK_BATCH_AUTO_OFF;
  bytestream << uniqueId;
  bytestream << sessionID;
  bytestream << (uint32_t)txnID.id;
  bytestream << tableOid;
  uint32_t msgRecived = 0;
  fWEClient->write_to_all(bytestream);
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
  bsIn.reset(new ByteStream());
  int rc = 0;
  ByteStream::byte tmp8;

  while (1)
  {
    if (msgRecived == fWEClient->getPmCount())
      break;

    fWEClient->read(uniqueId, bsIn);

    if (bsIn->length() == 0)  // read error
    {
      rc = NETWORK_ERROR;
      fWEClient->removeQueue(uniqueId);
      break;
    }
    else
    {
      *bsIn >> tmp8;
      rc = tmp8;

      if (rc != 0)
      {
        *bsIn >> errorMsg;
        fWEClient->removeQueue(uniqueId);
        break;
      }
      else
        msgRecived++;
    }
  }

  return rc;
}

int DMLPackageProcessor::flushDataFiles(int rcIn, std::map<FID, FID>& columnOids, uint64_t uniqueId,
                                        BRM::TxnID txnID, uint32_t tableOid)
{
  // cout <<"in flushDataFiles" << endl;
  ByteStream bytestream;
  bytestream << (ByteStream::byte)WE_SVR_FLUSH_FILES;
  bytestream << uniqueId;
  bytestream << (uint32_t)rcIn;
  bytestream << (uint32_t)txnID.id;
  bytestream << tableOid;
  uint32_t msgRecived = 0;
  fWEClient->write_to_all(bytestream);
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
  bsIn.reset(new ByteStream());
  int rc = 0;
  ByteStream::byte tmp8;
  std::string errorMsg;

  try
  {
    while (1)
    {
      if (msgRecived == fWEClient->getPmCount())
        break;

      fWEClient->read(uniqueId, bsIn);

      if (bsIn->length() == 0)  // read error
      {
        rc = NETWORK_ERROR;
        break;
      }
      else
      {
        *bsIn >> tmp8;
        rc = tmp8;

        if (rc != 0)
        {
          *bsIn >> errorMsg;
          break;
        }
        else
          msgRecived++;
      }
    }
  }
  catch (std::exception&)
  {
  }

  return rc;
}

int DMLPackageProcessor::endTransaction(uint64_t uniqueId, BRM::TxnID txnID, bool success)
{
  // cout <<"in flushDataFiles" << endl;
  ByteStream bytestream;
  bytestream << (ByteStream::byte)WE_END_TRANSACTION;
  bytestream << uniqueId;
  bytestream << (uint32_t)txnID.id;
  bytestream << (ByteStream::byte)success;
  uint32_t msgRecived = 0;
  fWEClient->write_to_all(bytestream);
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
  bsIn.reset(new ByteStream());
  int rc = 0;
  ByteStream::byte tmp8;
  std::string errorMsg;

  try
  {
    while (1)
    {
      if (msgRecived == fWEClient->getPmCount())
        break;

      fWEClient->read(uniqueId, bsIn);

      if (bsIn->length() == 0)  // read error
      {
        rc = NETWORK_ERROR;
        break;
      }
      else
      {
        *bsIn >> tmp8;
        rc = tmp8;

        if (rc != 0)
        {
          *bsIn >> errorMsg;
          break;
        }
        else
          msgRecived++;
      }
    }
  }
  catch (std::exception&)
  {
  }

  return rc;
}
}  // namespace dmlpackageprocessor
